package Demo1

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

import java.util.Properties

object C1 {

  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "master:9092")

    // 1.设置消费模式
    val consumer: FlinkKafkaConsumer[String] = new FlinkKafkaConsumer[String]("javatopic", new SimpleStringSchema(), properties)
    consumer.setStartFromEarliest()

    val ds1: DataStream[String] = env.addSource(consumer)

    val ds2: DataStream[RdFlink] = ds1.map(
      data => {
        val fields: Array[String] = data.split(" - - ")

        RdFlink(fields(0), fields(1))
      }
    )

    ds2.print("ds2")

    val conf: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("master").setPort(6379).setPassword("1234").build()

    ds2.addSink(new RedisSink[RdFlink](conf, new ReSinkT))

    env.execute("Jdbc ILKY")
  }

  case class RdFlink(Ip: String, actions: String)

  class ReSinkT extends RedisMapper[RdFlink] {
    override def getCommandDescription: RedisCommandDescription =
      new RedisCommandDescription(RedisCommand.HSET, "flink")

    override def getKeyFromData(t: RdFlink): String = t.Ip

    override def getValueFromData(t: RdFlink): String = t.actions
  }

}
